package com.xxx.flink.sink;

import com.xxx.flink.customsource.CustomSource;
import com.xxx.flink.pojo.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * Sink到redis
 */
public class SinkToRedis {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 读取数据
        DataStreamSource<Event> sourceStream = env.addSource(new CustomSource());

        // sink到redis
        FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost")
                .setPort(6379)
                .setPassword("123456")
                .setDatabase(0)
                .build();
        sourceStream.addSink(new RedisSink<>(jedisPoolConfig, new MyRedisMapper()));

        env.execute("redis");
    }

    public static class MyRedisMapper implements RedisMapper<Event> {

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "event");
        }

        @Override
        public String getKeyFromData(Event event) {
            return event.getName();
        }

        @Override
        public String getValueFromData(Event event) {
            return event.getUrl();
        }
    }


}
